﻿using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.ServiceModel;
using System.ServiceModel.Channels;
using System.Text;
using System.Threading.Tasks;
using Core;
using Core.Model;
using log4net;
using Core.Helper;

namespace DataServer.Server
{
    public class PushService : IPushService
    {
        private static readonly ILog _log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);

        public static readonly ConcurrentDictionary<String, SubscribeContext> _Subscribers = new ConcurrentDictionary<String, SubscribeContext>();
        private static readonly int _msgCount = 50;
        private static object _synObj = new  object();
        private static Queue<ArgumentBase<Event>> _msgQueue = new Queue<ArgumentBase<Event>>(_msgCount);
       
        public string ClientIpAndPort()
        {
            try
            {
                OperationContext context = OperationContext.Current;
                MessageProperties properties = context.IncomingMessageProperties;
                RemoteEndpointMessageProperty endpoint = properties[RemoteEndpointMessageProperty.Name] as RemoteEndpointMessageProperty;
                if (endpoint != null) return endpoint.Address + ":" + endpoint.Port.ToString();
            }
            catch 
            {
            }
         
            return "";
        }

        public void Subscribe(SubscribeArg a)
        {
            var callback = OperationContext.Current.GetCallbackChannel<IPushServiceCallback>();
            a.Username = a.Username.ToLower();
            _Subscribers[a.Username] = new SubscribeContext() { Arg = a, Callback = callback };

            #region 事件处理
            ICommunicationObject obj = (ICommunicationObject) callback;
            if (obj == null)
            {
                _log.Error("客户端订阅消息失败!");
                return;
            }
            _log.Info($"客户端连接成功:" + ClientIpAndPort());
         
            try
            {
                lock (_synObj)
                {
                    foreach (var argumentBase in _msgQueue)
                    {
                        //此处需要加上权限判断、订阅判断等
                        _Subscribers[a.Username].Callback.OnEventReceived(argumentBase);
                    }
                }
            }
            catch (Exception ex)
            {
                RemoveSubscriber(a.Username);
                _log.Error($"客户端{ClientIpAndPort()}信息发布:{ex.Message}");
                _log.Error($"客户端{ClientIpAndPort()}已断开连接");
            }
           
            obj.Closed += (s, e) =>
            {
                _log.Info("客户端断开连接!");
            };

            obj.Faulted += (s, e) => {

                //_log.Info($"客户端通信出错:" + ClientIpAndPort());
            };

            obj.Closing += (s, e) =>
            {
                try
                {
                    var callback2 = (IPushServiceCallback)s;

                    _Subscribers.ToList().ForEach(ent => {

                        if (ent.Value.Callback == callback2)
                        {
                            RemoveSubscriber(ent.Value.Arg.Username);
                        }
                    });
                }
                catch (Exception ex)
                {
                    _log.Error(ex.Message);
                }
            };
            #endregion

        }

        public void Unsubscribe(SubscribeArg a)
        {
            RemoveSubscriber(a.Username);

        }
        private static void RemoveSubscriber(string username)
        {
            username = username.ToLower();
            if (_Subscribers.ContainsKey(username))
            {
                _Subscribers.TryRemove(username, out _);
            }
        }

        public static void PostEvent(ArgumentBase<Event> a)
        {
            lock (_synObj)
            {
                _msgQueue.Enqueue(a, _msgCount);
            }
            _Subscribers.ToList().ForEach(subscriber =>
            {
                ICommunicationObject callback = (ICommunicationObject)subscriber.Value.Callback;
                if (((ICommunicationObject)callback).State == CommunicationState.Opened)
                {
                    try
                    {
                        //此处需要加上权限判断、订阅判断等
                        subscriber.Value.Callback.OnEventReceived( a);
                    }
                    catch 
                    {
                        RemoveSubscriber(subscriber.Value.Arg.Username);
                    }
                }
                else
                {
                    RemoveSubscriber(subscriber.Value.Arg.Username);
                }
            });
        }

        public static void ConfirmEvent(string[] ids)
        {
            lock (_synObj)
            {
                try
                {
                    if (_msgQueue.Count == 0) return;
                    List<ArgumentBase<Event>> keeps = new List<ArgumentBase<Event>>();
                    keeps.AddRange(_msgQueue.ToArray());
                    foreach (string id in ids)
                    {
                        foreach (var argumentBase in _msgQueue)
                        {
                            if (argumentBase.Model.Id == id)
                            {
                                keeps.Remove(argumentBase);
                            }
                        }
                    }
                    _msgQueue.Clear();
                    foreach (var argumentBase in keeps)
                    {
                        _msgQueue.Enqueue(argumentBase);
                    }
                }
                catch 
                {
                }
            }
        }


        #region IEventService 成员


        public DateTime Ping()
        {
            Console.WriteLine("Ping:" + ClientIpAndPort() + "," + DateTime.Now);
            return DateTime.Now;
        }

        #endregion
    }
}
